1   package org.apache.solr.cloud;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import com.google.common.cache.Cache;
21  import com.google.common.cache.CacheBuilder;
22  
23  import org.apache.solr.client.solrj.impl.HttpSolrClient;
24  import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
25  import org.apache.solr.common.SolrException;
26  import org.apache.solr.common.cloud.ClusterState;
27  import org.apache.solr.common.cloud.ClusterStateUtil;
28  import org.apache.solr.common.cloud.DocCollection;
29  import org.apache.solr.common.cloud.Replica;
30  import org.apache.solr.common.cloud.Slice;
31  import org.apache.solr.common.cloud.ZkStateReader;
32  import org.apache.solr.core.CloudConfig;
33  import org.apache.solr.update.UpdateShardHandler;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.slf4j.MDC;
37  
38  import java.io.Closeable;
39  import java.lang.invoke.MethodHandles;
40  import java.util.ArrayList;
41  import java.util.Collection;
42  import java.util.Collections;
43  import java.util.Comparator;
44  import java.util.HashMap;
45  import java.util.HashSet;
46  import java.util.Map;
47  import java.util.Set;
48  import java.util.TreeMap;
49  import java.util.concurrent.Callable;
50  import java.util.concurrent.ExecutorService;
51  import java.util.concurrent.TimeUnit;
52  
53  
54  // TODO: how to tmp exclude nodes?
55  
56  // TODO: more fine grained failover rules?
57  
58  // TODO: test with lots of collections
59  
60  // TODO: add config for only failover if replicas is < N
61  
62  // TODO: general support for non shared filesystems
63  // this is specialized for a shared file system, but it should
64  // not be much work to generalize
65  
66  // NOTE: using replication can slow down failover if a whole
67  // shard is lost.
68  
69  /**
70   *
71   * In this simple initial implementation we are limited in how quickly we detect
72   * a failure by a worst case of roughly zk session timeout + WAIT_AFTER_EXPIRATION_SECONDS + WORK_LOOP_DELAY_MS
73   * and best case of roughly zk session timeout + WAIT_AFTER_EXPIRATION_SECONDS. Also, consider the time to
74   * create the SolrCore, do any recovery necessary, and warm up the readers.
75   * 
76   * NOTE: this will only work with collections created via the collections api because they will have defined
77   * replicationFactor and maxShardsPerNode.
78   * 
79   * @lucene.experimental
80   */
81  public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
82    
83    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
84  
85    private Integer lastClusterStateVersion;
86    
87    private final ExecutorService updateExecutor;
88    private volatile boolean isClosed;
89    private ZkStateReader zkStateReader;
90    private final Cache<String,Long> baseUrlForBadNodes;
91    private Set<String> liveNodes = Collections.EMPTY_SET;
92  
93    private final int workLoopDelay;
94    private final int waitAfterExpiration;
95    
96    public OverseerAutoReplicaFailoverThread(CloudConfig config, ZkStateReader zkStateReader,
97        UpdateShardHandler updateShardHandler) {
98      this.zkStateReader = zkStateReader;
99      
100     this.workLoopDelay = config.getAutoReplicaFailoverWorkLoopDelay();
101     this.waitAfterExpiration = config.getAutoReplicaFailoverWaitAfterExpiration();
102     int badNodeExpiration = config.getAutoReplicaFailoverBadNodeExpiration();
103     
104     log.info(
105         "Starting "
106             + this.getClass().getSimpleName()
107             + " autoReplicaFailoverWorkLoopDelay={} autoReplicaFailoverWaitAfterExpiration={} autoReplicaFailoverBadNodeExpiration={}",
108         workLoopDelay, waitAfterExpiration, badNodeExpiration);
109 
110     baseUrlForBadNodes = CacheBuilder.newBuilder()
111         .concurrencyLevel(1).expireAfterWrite(badNodeExpiration, TimeUnit.MILLISECONDS).build();
112     
113     // TODO: Speed up our work loop when live_nodes changes??
114 
115     updateExecutor = updateShardHandler.getUpdateExecutor();
116 
117     
118     // TODO: perhaps do a health ping periodically to each node (scaryish)
119     // And/OR work on JIRA issue around self health checks (SOLR-5805)
120   }
121   
122   @Override
123   public void run() {
124     
125     while (!this.isClosed) {
126       // work loop
127       log.debug("do " + this.getClass().getSimpleName() + " work loop");
128 
129       // every n, look at state and make add / remove calls
130 
131       try {
132         doWork();
133       } catch (Exception e) {
134         SolrException.log(log, this.getClass().getSimpleName()
135             + " had an error in its thread work loop.", e);
136       }
137       
138       if (!this.isClosed) {
139         try {
140           Thread.sleep(workLoopDelay);
141         } catch (InterruptedException e) {
142           Thread.currentThread().interrupt();
143         }
144       }
145     }
146   }
147   
148   private void doWork() {
149     
150     // TODO: extract to configurable strategy class ??
151     ClusterState clusterState = zkStateReader.getClusterState();
152     //check if we have disabled autoAddReplicas cluster wide
153     String autoAddReplicas = (String) zkStateReader.getClusterProps().get(ZkStateReader.AUTO_ADD_REPLICAS);
154     if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
155       return;
156     }
157     if (clusterState != null) {
158       if (clusterState.getZkClusterStateVersion() != null &&
159           clusterState.getZkClusterStateVersion().equals(lastClusterStateVersion) && baseUrlForBadNodes.size() == 0 &&
160           liveNodes.equals(clusterState.getLiveNodes())) {
161         // nothing has changed, no work to do
162         return;
163       }
164 
165       liveNodes = clusterState.getLiveNodes();
166       lastClusterStateVersion = clusterState.getZkClusterStateVersion();
167       Set<String> collections = clusterState.getCollections();
168       for (final String collection : collections) {
169         log.debug("look at collection={}", collection);
170         DocCollection docCollection = clusterState.getCollection(collection);
171         if (!docCollection.getAutoAddReplicas()) {
172           log.debug("Collection {} is not setup to use autoAddReplicas, skipping..", docCollection.getName());
173           continue;
174         }
175         if (docCollection.getReplicationFactor() == null) {
176           log.debug("Skipping collection because it has no defined replicationFactor, name={}", docCollection.getName());
177           continue;
178         }
179         log.debug("Found collection, name={} replicationFactor={}", collection, docCollection.getReplicationFactor());
180         
181         Collection<Slice> slices = docCollection.getSlices();
182         for (Slice slice : slices) {
183           if (slice.getState() == Slice.State.ACTIVE) {
184             
185             final Collection<DownReplica> downReplicas = new ArrayList<DownReplica>();
186             
187             int goodReplicas = findDownReplicasInSlice(clusterState, docCollection, slice, downReplicas);
188             
189             log.debug("collection={} replicationFactor={} goodReplicaCount={}", docCollection.getName(), docCollection.getReplicationFactor(), goodReplicas);
190             
191             if (downReplicas.size() > 0 && goodReplicas < docCollection.getReplicationFactor()) {
192               // badReplicaMap.put(collection, badReplicas);
193               processBadReplicas(collection, downReplicas);
194             } else if (goodReplicas > docCollection.getReplicationFactor()) {
195               log.debug("There are too many replicas");
196             }
197           }
198         }
199       }
200      
201     }
202   }
203 
204   private void processBadReplicas(final String collection, final Collection<DownReplica> badReplicas) {
205     for (DownReplica badReplica : badReplicas) {
206       log.debug("process down replica={} from collection={}", badReplica.replica.getName(), collection);
207       String baseUrl = badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP);
208       Long wentBadAtNS = baseUrlForBadNodes.getIfPresent(baseUrl);
209       if (wentBadAtNS == null) {
210         log.warn("Replica {} may need to failover.",
211             badReplica.replica.getName());
212         baseUrlForBadNodes.put(baseUrl, System.nanoTime());
213         
214       } else {
215         
216         long elasped = System.nanoTime() - wentBadAtNS;
217         if (elasped < TimeUnit.NANOSECONDS.convert(waitAfterExpiration, TimeUnit.MILLISECONDS)) {
218           // protect against ZK 'flapping', startup and shutdown
219           log.debug("Looks troublesome...continue. Elapsed={}", elasped + "ns");
220         } else {
221           log.debug("We need to add a replica. Elapsed={}", elasped + "ns");
222           
223           if (addReplica(collection, badReplica)) {
224             baseUrlForBadNodes.invalidate(baseUrl);
225           }
226         }
227       }
228     }
229   }
230 
231   private boolean addReplica(final String collection, DownReplica badReplica) {
232     // first find best home - first strategy, sort by number of cores
233     // hosted where maxCoresPerNode is not violated
234     final String createUrl = getBestCreateUrl(zkStateReader, badReplica);
235     if (createUrl == null) {
236       log.warn("Could not find a node to create new replica on.");
237       return false;
238     }
239     
240     // NOTE: we send the absolute path, which will slightly change
241     // behavior of these cores as they won't respond to changes
242     // in the solr.hdfs.home sys prop as they would have.
243     final String dataDir = badReplica.replica.getStr("dataDir");
244     final String ulogDir = badReplica.replica.getStr("ulogDir");
245     final String coreNodeName = badReplica.replica.getName();
246     if (dataDir != null) {
247       // need an async request - full shard goes down leader election
248       final String coreName = badReplica.replica.getStr(ZkStateReader.CORE_NAME_PROP);
249       log.debug("submit call to {}", createUrl);
250       MDC.put("OverseerAutoReplicaFailoverThread.createUrl", createUrl);
251       try {
252         updateExecutor.submit(new Callable<Boolean>() {
253 
254           @Override
255           public Boolean call() {
256             return createSolrCore(collection, createUrl, dataDir, ulogDir, coreNodeName, coreName);
257           }
258         });
259       } finally {
260         MDC.remove("OverseerAutoReplicaFailoverThread.createUrl");
261       }
262 
263       // wait to see state for core we just created
264       boolean success = ClusterStateUtil.waitToSeeLiveReplica(zkStateReader, collection, coreNodeName, createUrl, 30000);
265       if (!success) {
266         log.error("Creating new replica appears to have failed, timed out waiting to see created SolrCore register in the clusterstate.");
267         return false;
268       }
269       return true;
270     }
271     
272     log.warn("Could not find dataDir or ulogDir in cluster state.");
273     
274     return false;
275   }
276 
277   private static int findDownReplicasInSlice(ClusterState clusterState, DocCollection collection, Slice slice, final Collection<DownReplica> badReplicas) {
278     int goodReplicas = 0;
279     Collection<Replica> replicas = slice.getReplicas();
280     if (replicas != null) {
281       for (Replica replica : replicas) {
282         // on a live node?
283         boolean live = clusterState.liveNodesContain(replica.getNodeName());
284         final Replica.State state = replica.getState();
285         
286         final boolean okayState = state == Replica.State.DOWN
287             || state == Replica.State.RECOVERING
288             || state == Replica.State.ACTIVE;
289         
290         log.debug("Process replica name={} live={} state={}", replica.getName(), live, state.toString());
291         
292         if (live && okayState) {
293           goodReplicas++;
294         } else {
295           DownReplica badReplica = new DownReplica();
296           badReplica.replica = replica;
297           badReplica.slice = slice;
298           badReplica.collection = collection;
299           badReplicas.add(badReplica);
300         }
301       }
302     }
303     log.debug("bad replicas for slice {}", badReplicas);
304     return goodReplicas;
305   }
306   
307   /**
308    * 
309    * @return the best node to replace the badReplica on or null if there is no
310    *         such node
311    */
312   static String getBestCreateUrl(ZkStateReader zkStateReader, DownReplica badReplica) {
313     assert badReplica != null;
314     assert badReplica.collection != null;
315     assert badReplica.slice != null;
316     log.debug("getBestCreateUrl for " + badReplica.replica);
317     Map<String,Counts> counts = new HashMap<String, Counts>();
318     Set<String> unsuitableHosts = new HashSet<String>();
319     
320     Set<String> liveNodes = new HashSet<>(zkStateReader.getClusterState().getLiveNodes());
321     
322     ClusterState clusterState = zkStateReader.getClusterState();
323     if (clusterState != null) {
324       Set<String> collections = clusterState.getCollections();
325       for (String collection : collections) {
326         log.debug("look at collection {} as possible create candidate", collection); 
327         DocCollection docCollection = clusterState.getCollection(collection);
328         // TODO - only operate on collections with sharedfs failover = true ??
329         Collection<Slice> slices = docCollection.getSlices();
330         for (Slice slice : slices) {
331           // only look at active shards
332           if (slice.getState() == Slice.State.ACTIVE) {
333             log.debug("look at slice {} for collection {} as possible create candidate", slice.getName(), collection); 
334             Collection<Replica> replicas = slice.getReplicas();
335 
336             for (Replica replica : replicas) {
337               liveNodes.remove(replica.getNodeName());
338               String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
339               if (baseUrl.equals(
340                   badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP))) {
341                 continue;
342               }
343               // on a live node?
344               log.debug("collection={} nodename={} livenodes={}", collection, replica.getNodeName(), clusterState.getLiveNodes());
345               boolean live = clusterState.liveNodesContain(replica.getNodeName());
346               log.debug("collection={} look at replica {} as possible create candidate, live={}", collection, replica.getName(), live); 
347               if (live) {
348                 Counts cnt = counts.get(baseUrl);
349                 if (cnt == null) {
350                   cnt = new Counts();
351                 }
352                 if (badReplica.collection.getName().equals(collection)) {
353                   cnt.negRankingWeight += 3;
354                   cnt.collectionShardsOnNode += 1;
355                 } else {
356                   cnt.negRankingWeight += 1;
357                 }
358                 if (badReplica.collection.getName().equals(collection) && badReplica.slice.getName().equals(slice.getName())) {
359                   cnt.ourReplicas++;
360                 }
361                 
362                 // TODO: this is collection wide and we want to take into
363                 // account cluster wide - use new cluster sys prop
364                 Integer maxShardsPerNode = badReplica.collection.getMaxShardsPerNode();
365                 if (maxShardsPerNode == null) {
366                   log.warn("maxShardsPerNode is not defined for collection, name=" + badReplica.collection.getName());
367                   maxShardsPerNode = Integer.MAX_VALUE;
368                 }
369                 log.debug("collection={} node={} max shards per node={} potential hosts={}", collection, baseUrl, maxShardsPerNode, cnt);
370                 
371                 Collection<Replica> badSliceReplicas = null;
372                 DocCollection c = clusterState.getCollection(badReplica.collection.getName());
373                 if (c != null) {
374                   Slice s = c.getSlice(badReplica.slice.getName());
375                   if (s != null) {
376                     badSliceReplicas = s.getReplicas();
377                   }
378                 }
379                 boolean alreadyExistsOnNode = replicaAlreadyExistsOnNode(zkStateReader.getClusterState(), badSliceReplicas, badReplica, baseUrl);
380                 if (unsuitableHosts.contains(baseUrl) || alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode) {
381                   counts.remove(baseUrl);
382                   unsuitableHosts.add(baseUrl);
383                   log.debug("not a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
384                 } else {
385                   counts.put(baseUrl, cnt);
386                   log.debug("is a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
387                 }
388               }
389             }
390           }
391         }
392       }
393     }
394     
395     for (String node : liveNodes) {
396       counts.put(zkStateReader.getBaseUrlForNodeName(node), new Counts(0, 0));
397     }
398     
399     if (counts.size() == 0) {
400       log.debug("no suitable hosts found for getBestCreateUrl for collection={}", badReplica.collection.getName());
401       return null;
402     }
403     
404     ValueComparator vc = new ValueComparator(counts);
405     Map<String,Counts> sortedCounts = new TreeMap<String, Counts>(vc);
406     sortedCounts.putAll(counts);
407     
408     log.debug("empty nodes={} for collection={}", liveNodes, badReplica.collection.getName());
409     log.debug("sorted hosts={} for collection={}", sortedCounts, badReplica.collection.getName());
410     log.debug("unsuitable hosts={} for collection={}", unsuitableHosts, badReplica.collection.getName());
411     
412     return sortedCounts.keySet().iterator().next();
413   }
414   
415   private static boolean replicaAlreadyExistsOnNode(ClusterState clusterState, Collection<Replica> replicas, DownReplica badReplica, String baseUrl) {
416     if (replicas != null) {
417       log.debug("collection={} check if replica already exists on node using replicas {}", badReplica.collection.getName(), getNames(replicas));
418       for (Replica replica : replicas) {
419         final Replica.State state = replica.getState();
420         if (!replica.getName().equals(badReplica.replica.getName()) && replica.getStr(ZkStateReader.BASE_URL_PROP).equals(baseUrl)
421             && clusterState.liveNodesContain(replica.getNodeName())
422             && (state == Replica.State.ACTIVE || state == Replica.State.DOWN || state == Replica.State.RECOVERING)) {
423           log.debug("collection={} replica already exists on node, bad replica={}, existing replica={}, node name={}",  badReplica.collection.getName(), badReplica.replica.getName(), replica.getName(), replica.getNodeName());
424           return true;
425         }
426       }
427     }
428     log.debug("collection={} replica does not yet exist on node: {}",  badReplica.collection.getName(), baseUrl);
429     return false;
430   }
431   
432   private static Object getNames(Collection<Replica> replicas) {
433     Set<String> names = new HashSet<>(replicas.size());
434     for (Replica replica : replicas) {
435       names.add(replica.getName());
436     }
437     return names;
438   }
439 
440   private boolean createSolrCore(final String collection,
441       final String createUrl, final String dataDir, final String ulogDir,
442       final String coreNodeName, final String coreName) {
443 
444     try (HttpSolrClient client = new HttpSolrClient(createUrl)) {
445       log.debug("create url={}", createUrl);
446       client.setConnectionTimeout(30000);
447       client.setSoTimeout(60000);
448       Create createCmd = new Create();
449       createCmd.setCollection(collection);
450       createCmd.setCoreNodeName(coreNodeName);
451       // TODO: how do we ensure unique coreName
452       // for now, the collections API will use unique names
453       createCmd.setCoreName(coreName);
454       createCmd.setDataDir(dataDir);
455       createCmd.setUlogDir(ulogDir);
456       client.request(createCmd);
457     } catch (Exception e) {
458       SolrException.log(log, "Exception trying to create new replica on " + createUrl, e);
459       return false;
460     }
461     return true;
462   }
463   
464   private static class ValueComparator implements Comparator<String> {
465     Map<String,Counts> map;
466     
467     public ValueComparator(Map<String,Counts> map) {
468       this.map = map;
469     }
470     
471     public int compare(String a, String b) {
472       if (map.get(a).negRankingWeight >= map.get(b).negRankingWeight) {
473         return 1;
474       } else {
475         return -1;
476       }
477     }
478   }
479   
480   @Override
481   public void close() {
482     isClosed = true;
483   }
484   
485   public boolean isClosed() {
486     return isClosed;
487   }
488   
489   
490   private static class Counts {
491     int collectionShardsOnNode = 0;
492     int negRankingWeight = 0;
493     int ourReplicas = 0;
494     
495     private Counts() {
496       
497     }
498     
499     private Counts(int totalReplicas, int ourReplicas) {
500       this.negRankingWeight = totalReplicas;
501       this.ourReplicas = ourReplicas;
502     }
503     
504     @Override
505     public String toString() {
506       return "Counts [negRankingWeight=" + negRankingWeight + ", sameSliceCount="
507           + ourReplicas + ", collectionShardsOnNode=" + collectionShardsOnNode + "]";
508     }
509   }
510   
511   static class DownReplica {
512     Replica replica;
513     Slice slice;
514     DocCollection collection;
515     
516     @Override
517     public String toString() {
518       return "DownReplica [replica=" + replica.getName() + ", slice="
519           + slice.getName() + ", collection=" + collection.getName() + "]";
520     }
521   }
522   
523 }